/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.gcp.bigquery;



import com.google.api.services.bigquery.model.*;

import org.apache.avro.generic.GenericRecord;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.io.AvroSource;

import com.bff.gaia.unified.sdk.io.BoundedSource;

import com.bff.gaia.unified.sdk.io.fs.MatchResult;

import com.bff.gaia.unified.sdk.io.fs.ResourceId;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Function;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Supplier;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Suppliers;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.Nullable;

import java.io.IOException;

import java.io.Serializable;

import java.util.List;



import static com.bff.gaia.unified.sdk.io.FileSystems.match;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



/**

 * An abstract {@link BoundedSource} to read a table from BigQuery.

 *

 * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then reads

 * in parallel from each produced file. It is implemented by {@link BigQueryTableSource}, and {@link

 * BigQueryQuerySource}, depending on the configuration of the read. Specifically,

 *

 * <ul>

 *   <li>{@link BigQueryTableSource} is for reading BigQuery tables

 *   <li>{@link BigQueryQuerySource} is for querying BigQuery tables

 * </ul>

 *

 * ...

 */

abstract class BigQuerySourceBase<T> extends BoundedSource<T> {

  private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceBase.class);



  // The maximum number of retries to poll a BigQuery job.

  protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;



  protected final String stepUuid;

  protected final BigQueryServices bqServices;



  private transient List<BoundedSource<T>> cachedSplitResult;

  private SerializableFunction<SchemaAndRecord, T> parseFn;

  private Coder<T> coder;



  BigQuerySourceBase(

      String stepUuid,

      BigQueryServices bqServices,

      Coder<T> coder,

      SerializableFunction<SchemaAndRecord, T> parseFn) {

    this.stepUuid = checkNotNull(stepUuid, "stepUuid");

    this.bqServices = checkNotNull(bqServices, "bqServices");

    this.coder = checkNotNull(coder, "coder");

    this.parseFn = checkNotNull(parseFn, "parseFn");

  }



  protected static class ExtractResult {

    public final TableSchema schema;

    public final List<ResourceId> extractedFiles;

    public List<MatchResult.Metadata> metadata = null;



    public ExtractResult(TableSchema schema, List<ResourceId> extractedFiles) {

      this(schema, extractedFiles, null);

    }



    public ExtractResult(

		TableSchema schema, List<ResourceId> extractedFiles, List<MatchResult.Metadata> metadata) {

      this.schema = schema;

      this.extractedFiles = extractedFiles;

      this.metadata = metadata;

    }

  }



  protected ExtractResult extractFiles(PipelineOptions options) throws Exception {

    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);

    TableReference tableToExtract = getTableToExtract(bqOptions);

    BigQueryServices.DatasetService datasetService = bqServices.getDatasetService(bqOptions);

    Table table = datasetService.getTable(tableToExtract);

    if (table == null) {

      throw new IOException(

          String.format(

              "Cannot start an export job since table %s does not exist",

              BigQueryHelpers.toTableSpec(tableToExtract)));

    }



    TableSchema schema = table.getSchema();

    BigQueryServices.JobService jobService = bqServices.getJobService(bqOptions);

    String extractJobId = BigQueryHelpers.getExtractJobId(BigQueryHelpers.createJobIdToken(options.getJobName(), stepUuid));

    final String extractDestinationDir =

        BigQueryHelpers.resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);

    String bqLocation =

        BigQueryHelpers.getDatasetLocation(

            datasetService, tableToExtract.getProjectId(), tableToExtract.getDatasetId());

    List<ResourceId> tempFiles =

        executeExtract(

            extractJobId,

            tableToExtract,

            jobService,

            bqOptions.getProject(),

            extractDestinationDir,

            bqLocation);

    return new ExtractResult(schema, tempFiles);

  }



  @Override

  public List<BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions options)

      throws Exception {

    // split() can be called multiple times, e.g. Dataflow runner may call it multiple times

    // with different desiredBundleSizeBytes in case the split() call produces too many sources.

    // We ignore desiredBundleSizeBytes anyway, however in any case, we should not initiate

    // another BigQuery extract job for the repeated split() calls.

    if (cachedSplitResult == null) {

      ExtractResult res = extractFiles(options);

      LOG.info("Extract job produced {} files", res.extractedFiles.size());



      if (res.extractedFiles.size() > 0) {

        BigQueryOptions bqOptions = options.as(BigQueryOptions.class);

        final String extractDestinationDir =

            BigQueryHelpers.resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);

        // Match all files in the destination directory to stat them in bulk.

        List<MatchResult> matches = match(ImmutableList.of(extractDestinationDir + "*"));

        if (matches.size() > 0) {

          res.metadata = matches.get(0).metadata();

        }

      }

      cleanupTempResource(options.as(BigQueryOptions.class));

      cachedSplitResult = checkNotNull(createSources(res.extractedFiles, res.schema, res.metadata));

    }

    return cachedSplitResult;

  }



  protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;



  protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception;



  @Override

  public BoundedReader<T> createReader(PipelineOptions options) throws IOException {

    throw new UnsupportedOperationException("BigQuery source must be split before being read");

  }



  @Override

  public void validate() {

    // Do nothing, validation is done in BigQuery.Read.

  }



  @Override

  public Coder<T> getOutputCoder() {

    return coder;

  }



  private List<ResourceId> executeExtract(

      String jobId,

      TableReference table,

      BigQueryServices.JobService jobService,

      String executingProject,

      String extractDestinationDir,

      String bqLocation)

      throws InterruptedException, IOException {



    JobReference jobRef =

        new JobReference().setProjectId(executingProject).setLocation(bqLocation).setJobId(jobId);



    String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir);

    JobConfigurationExtract extract =

        new JobConfigurationExtract()

            .setSourceTable(table)

            .setDestinationFormat("AVRO")

            .setDestinationUris(ImmutableList.of(destinationUri));



    LOG.info("Starting BigQuery extract job: {}", jobId);

    jobService.startExtractJob(jobRef, extract);

    Job extractJob = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);

    if (BigQueryHelpers.parseStatus(extractJob) != BigQueryHelpers.Status.SUCCEEDED) {

      throw new IOException(

          String.format(

              "Extract job %s failed, status: %s.",

              extractJob.getJobReference().getJobId(),

              BigQueryHelpers.statusToPrettyString(extractJob.getStatus())));

    }



    LOG.info("BigQuery extract job completed: {}", jobId);



    return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);

  }



  private static class TableSchemaFunction implements Serializable, Function<String, TableSchema> {

    @Nullable

    @Override

    public TableSchema apply(@Nullable String input) {

      return BigQueryHelpers.fromJsonString(input, TableSchema.class);

    }

  }



  List<BoundedSource<T>> createSources(

	  List<ResourceId> files, TableSchema schema, List<MatchResult.Metadata> metadata)

      throws IOException, InterruptedException {



    final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema);

    SerializableFunction<GenericRecord, T> fnWrapper =

        new SerializableFunction<GenericRecord, T>() {

          private Supplier<TableSchema> schema =

              Suppliers.memoize(

                  Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(jsonSchema)));



          @Override

          public T apply(GenericRecord input) {

            return parseFn.apply(new SchemaAndRecord(input, schema.get()));

          }

        };



    List<BoundedSource<T>> avroSources = Lists.newArrayList();

    // If metadata is available, create AvroSources with said metadata in SINGLE_FILE_OR_SUBRANGE

    // mode.

    if (metadata != null) {

      for (MatchResult.Metadata file : metadata) {

        avroSources.add(AvroSource.from(file).withParseFn(fnWrapper, getOutputCoder()));

      }

    } else {

      for (ResourceId file : files) {

        avroSources.add(AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder()));

      }

    }

    return ImmutableList.copyOf(avroSources);

  }

}